// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package importinto

import (
	"context"

	"github.com/pingcap/errors"
	"github.com/pingcap/failpoint"
	"github.com/pingcap/tidb/pkg/config/kerneltype"
	"github.com/pingcap/tidb/pkg/ddl"
	"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
	"github.com/pingcap/tidb/pkg/domain"
	"github.com/pingcap/tidb/pkg/executor/importer"
	"github.com/pingcap/tidb/pkg/kv"
	"github.com/pingcap/tidb/pkg/lightning/backend"
	"github.com/pingcap/tidb/pkg/lightning/backend/local"
	"github.com/pingcap/tidb/pkg/lightning/checkpoints"
	"github.com/pingcap/tidb/pkg/lightning/log"
	"github.com/pingcap/tidb/pkg/lightning/mydump"
	verify "github.com/pingcap/tidb/pkg/lightning/verification"
	"github.com/pingcap/tidb/pkg/meta/model"
	"github.com/pingcap/tidb/pkg/resourcegroup"
	"github.com/pingcap/tidb/pkg/sessionctx"
	"github.com/pingcap/tidb/pkg/util/logutil"
	"github.com/tikv/client-go/v2/util"
	"go.uber.org/zap"
)

// MiniTaskExecutor is the interface for a minimal task executor.
// exported for testing.
type MiniTaskExecutor interface {
	Run(ctx context.Context, dataWriter, indexWriter backend.EngineWriter, collector execute.Collector) error
}

// importMinimalTaskExecutor is a minimal task executor for IMPORT INTO.
type importMinimalTaskExecutor struct {
	mTtask *importStepMinimalTask
}

var newImportMinimalTaskExecutor = newImportMinimalTaskExecutor0

func newImportMinimalTaskExecutor0(t *importStepMinimalTask) MiniTaskExecutor {
	return &importMinimalTaskExecutor{
		mTtask: t,
	}
}

func (e *importMinimalTaskExecutor) Run(
	ctx context.Context,
	dataWriter, indexWriter backend.EngineWriter,
	collector execute.Collector,
) error {
	logger := e.mTtask.logger
	failpoint.Inject("beforeSortChunk", func() {})
	failpoint.Inject("errorWhenSortChunk", func() {
		failpoint.Return(errors.New("occur an error when sort chunk"))
	})
	failpoint.InjectCall("syncBeforeSortChunk")
	sharedVars := e.mTtask.SharedVars

	chunkCheckpoint := toChunkCheckpoint(e.mTtask.Chunk)
	chunkCheckpoint.FileMeta.ParquetMeta = mydump.ParquetFileMeta{
		Loc: sharedVars.TableImporter.Location,
	}

	checksum := verify.NewKVGroupChecksumWithKeyspace(sharedVars.TableImporter.GetKeySpace())
	if sharedVars.TableImporter.IsLocalSort() {
		if err := importer.ProcessChunk(
			ctx,
			&chunkCheckpoint,
			sharedVars.TableImporter,
			sharedVars.DataEngine,
			sharedVars.IndexEngine,
			logger,
			checksum,
			collector,
		); err != nil {
			return err
		}
	} else {
		if err := importer.ProcessChunkWithWriter(
			ctx,
			&chunkCheckpoint,
			sharedVars.TableImporter,
			dataWriter,
			indexWriter,
			logger,
			checksum,
			collector,
		); err != nil {
			return err
		}
	}

	sharedVars.mu.Lock()
	defer sharedVars.mu.Unlock()
	sharedVars.Checksum.Add(checksum)
	return nil
}

// postProcess does the post-processing for the task.
func (p *postProcessStepExecutor) postProcess(ctx context.Context, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) {
	failpoint.InjectCall("syncBeforePostProcess", p.taskMeta.JobID)

	callLog := log.BeginTask(logger, "post process")
	defer func() {
		callLog.End(zap.ErrorLevel, err)
	}()

	plan := &p.taskMeta.Plan
	if err = importer.RebaseAllocatorBases(ctx, p.store, subtaskMeta.MaxIDs, plan, logger); err != nil {
		return err
	}

	localChecksum := verify.NewKVGroupChecksumForAdd()
	for id, cksum := range subtaskMeta.Checksum {
		callLog.Info(
			"kv group checksum",
			zap.Int64("groupId", id),
			zap.Uint64("size", cksum.Size),
			zap.Uint64("kvs", cksum.KVs),
			zap.Uint64("checksum", cksum.Sum),
		)
		localChecksum.AddRawGroup(id, cksum.Size, cksum.KVs, cksum.Sum)
	}

	ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask)
	if kerneltype.IsNextGen() {
		bfWeight := importer.GetBackoffWeight(plan)
		mgr := local.NewTiKVChecksumManagerForImportInto(p.store, p.taskID,
			uint(plan.DistSQLScanConcurrency), bfWeight, resourcegroup.DefaultResourceGroupName)
		defer mgr.Close()
		return importer.VerifyChecksum(ctx, plan, localChecksum.MergedChecksum(), logger,
			func() (*local.RemoteChecksum, error) {
				ctxWithLogger := logutil.WithLogger(ctx, logger)
				return mgr.Checksum(ctxWithLogger, &checkpoints.TidbTableInfo{
					DB:   plan.DBName,
					Name: plan.TableInfo.Name.L,
					Core: plan.TableInfo,
				})
			},
		)
	}

	return p.taskTbl.WithNewSession(func(se sessionctx.Context) error {
		err = importer.VerifyChecksum(ctx, plan, localChecksum.MergedChecksum(), logger,
			func() (*local.RemoteChecksum, error) {
				return importer.RemoteChecksumTableBySQL(ctx, se, plan, logger)
			},
		)
		if kerneltype.IsClassic() {
			failpoint.Inject("skipPostProcessAlterTableMode", func() {
				failpoint.Return(err)
			})
			// log error instead of raise error to avoid user rerun task,
			// clean up will alter table mode to normal finally.
			err2 := ddl.AlterTableMode(domain.GetDomain(se).DDLExecutor(), se, model.TableModeNormal, p.taskMeta.Plan.DBID, p.taskMeta.Plan.TableInfo.ID)
			if err2 != nil {
				callLog.Warn("alter table mode to normal failure", zap.Error(err2))
			}
		}
		return err
	})
}
